Coverage Report

Created: 2026-04-21 11:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
D:\a\scloud-dns\scloud-dns\src\workers\manager\channels_generation.rs
Line
Count
Source
1
use crate::exceptions::SCloudException;
2
use crate::workers::{SCloudWorker, WorkerType};
3
use std::collections::HashMap;
4
use std::sync::Arc;
5
use tokio::sync::mpsc;
6
7
0
pub(crate) async fn generate_channels(
8
0
    workers: Vec<Arc<SCloudWorker>>,
9
0
) -> Result<(), SCloudException> {
10
0
    let mut wl: HashMap<&str, Vec<Arc<SCloudWorker>>> = HashMap::new();
11
0
    for w in workers {
12
0
        let key = match &w.get_worker_type() {
13
0
            WorkerType::LISTENER => "listener",
14
0
            WorkerType::DECODER => "decoder",
15
0
            WorkerType::QUERY_DISPATCHER => "query-dispatcher",
16
0
            WorkerType::CACHE_LOOKUP => "cache-lookup",
17
0
            WorkerType::ZONE_MANAGER => "zone-manager",
18
0
            WorkerType::RESOLVER => "resolver",
19
0
            WorkerType::CACHE_WRITER => "cache-writer",
20
0
            WorkerType::ENCODER => "encoder",
21
0
            WorkerType::SENDER => "sender",
22
0
            WorkerType::CACHE_JANITOR => "cache-janitor",
23
0
            WorkerType::METRICS => "metrics",
24
0
            WorkerType::TCP_ACCEPTOR => "tcp-acceptor",
25
0
            WorkerType::DOH_ACCEPTOR => "doh-acceptor",
26
0
            WorkerType::NONE => "none",
27
        };
28
0
        wl.entry(key).or_insert_with(Vec::new).push(Arc::clone(&w));
29
    }
30
31
0
    let default_worker = vec![Arc::new(SCloudWorker::new(WorkerType::NONE)?)];
32
0
    let tcp_acceptor = wl.get("tcp-acceptor").unwrap_or(&default_worker);
33
0
    let doh_acceptor = wl.get("doh-acceptor").cloned();
34
0
    let decoder = wl.get("decoder").unwrap_or(&default_worker);
35
0
    let query_dispatcher = wl.get("query-dispatcher").unwrap_or(&default_worker);
36
0
    let cache_lookup = wl.get("cache-lookup").unwrap_or(&default_worker);
37
0
    let cache_writers = wl.get("cache-writer").unwrap_or(&default_worker);
38
0
    let zone_manager = wl.get("zone-manager").unwrap_or(&default_worker);
39
0
    let resolvers = wl.get("resolver").unwrap_or(&default_worker);
40
0
    let encoders = wl.get("encoder").unwrap_or(&default_worker);
41
0
    let senders = wl.get("sender").unwrap_or(&default_worker);
42
43
    // Helper: wire N producers -> M consumers
44
    // Each producer gets M senders, each consumer gets N receivers
45
0
    async fn wire(
46
0
        producers: &[Arc<SCloudWorker>],
47
0
        consumers: &[Arc<SCloudWorker>],
48
0
        capacity: usize,
49
0
    ) {
50
0
        for p in producers {
51
0
            let mut txs = Vec::new();
52
0
            for c in consumers {
53
0
                let (tx, rx) = mpsc::channel(capacity);
54
0
                c.push_dns_rx(rx).await;
55
0
                txs.push(tx);
56
            }
57
0
            p.push_dns_tx_many(txs).await;
58
        }
59
0
    }
60
61
0
    wire(tcp_acceptor, decoder, 1024).await;
62
0
    if let Some(doh) = doh_acceptor.as_deref() {
63
0
        wire(doh, decoder, 1024).await;
64
0
    }
65
0
    wire(decoder, cache_lookup, 1024).await;
66
0
    wire(cache_lookup, cache_writers, 1024).await; // tx[0] = miss path
67
0
    wire(cache_lookup, query_dispatcher, 1024).await; // tx[1] = hit path
68
0
    wire(query_dispatcher, zone_manager, 1024).await;
69
0
    wire(query_dispatcher, resolvers, 1024).await;
70
0
    wire(zone_manager, cache_writers, 1024).await;
71
0
    wire(resolvers, cache_writers, 1024).await;
72
0
    wire(cache_writers, encoders, 1024).await;
73
0
    wire(encoders, senders, 1024).await;
74
75
0
    Ok(())
76
0
}